"""
BlueZ D-Bus manager module
--------------------------

This module contains code for the global BlueZ D-Bus object manager that is
used internally by Bleak.
"""

import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "linux":
        assert False, "This backend is only available on Linux"

import asyncio
import contextlib
import logging
import os
from collections import defaultdict
from collections.abc import Callable, Coroutine, MutableMapping
from functools import partial
from typing import Any, NamedTuple, Optional, cast
from weakref import WeakKeyDictionary

from dbus_fast import AuthError, BusType, Message, MessageType, Variant, unpack_variants
from dbus_fast.aio.message_bus import MessageBus

from bleak.args.bluez import OrPatternLike
from bleak.backends.bluezdbus import defs
from bleak.backends.bluezdbus.advertisement_monitor import AdvertisementMonitor
from bleak.backends.bluezdbus.defs import (
    Device1,
    GattCharacteristic1,
    GattDescriptor1,
    GattService1,
)
from bleak.backends.bluezdbus.signals import MatchRules, add_match
from bleak.backends.bluezdbus.utils import (
    assert_reply,
    device_path_from_characteristic_path,
    extract_service_handle_from_path,
    get_dbus_authenticator,
)
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.descriptor import BleakGATTDescriptor
from bleak.backends.service import BleakGATTService, BleakGATTServiceCollection
from bleak.exc import (
    BleakBluetoothNotAvailableError,
    BleakBluetoothNotAvailableReason,
    BleakDBusError,
    BleakError,
)

logger = logging.getLogger(__name__)

AdvertisementCallback = Callable[[str, Device1], None]
"""
A callback that is called when advertisement data is received.

Args:
    arg0: The D-Bus object path of the device.
    arg1: The D-Bus properties of the device object.
"""


DevicePropertiesChangedCallback = Callable[[Optional[Any]], None]
"""
A callback that is called when the properties of a device change in BlueZ.

Args:
    arg0: The new property value.
"""


class DeviceConditionCallback(NamedTuple):
    """
    Encapsulates a :data:`DevicePropertiesChangedCallback` and the property name being watched.
    """

    callback: DevicePropertiesChangedCallback
    """
    The callback.
    """

    property_name: str
    """
    The name of the property to watch.
    """


DeviceRemovedCallback = Callable[[str], None]
"""
A callback that is called when a device is removed from BlueZ.

Args:
    arg0: The D-Bus object path of the device.
"""


class DeviceRemovedCallbackAndState(NamedTuple):
    """
    Encapsulates an :data:`DeviceRemovedCallback` and some state.
    """

    callback: DeviceRemovedCallback
    """
    The callback.
    """

    adapter_path: str
    """
    The D-Bus object path of the adapter associated with the callback.
    """


DeviceConnectedChangedCallback = Callable[[bool], None]
"""
A callback that is called when a device's "Connected" property changes.

Args:
    arg0: The current value of the "Connected" property.
"""

CharacteristicValueChangedCallback = Callable[[str, bytes], None]
"""
A callback that is called when a characteristics's "Value" property changes.

Args:
    arg0: The D-Bus object path of the characteristic.
    arg1: The current value of the "Value" property.
"""


class DeviceWatcher(NamedTuple):
    device_path: str
    """
    The D-Bus object path of the device.
    """

    on_connected_changed: DeviceConnectedChangedCallback
    """
    A callback that is called when a device's "Connected" property changes.
    """

    on_characteristic_value_changed: CharacteristicValueChangedCallback
    """
    A callback that is called when a characteristics's "Value" property changes.
    """


# set of org.bluez.Device1 property names that come from advertising data
_ADVERTISING_DATA_PROPERTIES = {
    "AdvertisingData",
    "AdvertisingFlags",
    "ManufacturerData",
    "Name",
    "ServiceData",
    "UUIDs",
}


def get_max_write_without_response_size(char_props: GattCharacteristic1) -> int:
    # "MTU" property was added in BlueZ 5.62, otherwise fall
    # back to minimum MTU according to Bluetooth spec.
    return char_props.get("MTU", 23) - 3


class BlueZManager:
    """
    BlueZ D-Bus object manager.

    Use :func:`bleak.backends.bluezdbus.get_global_bluez_manager` to get the global instance.
    """

    def __init__(self) -> None:
        self._bus: Optional[MessageBus] = None
        self._bus_lock = asyncio.Lock()

        # dict of object path: dict of interface name: dict of property name: property value
        self._properties: dict[str, dict[str, dict[str, Any]]] = {}

        # set of available adapters for quick lookup
        self._adapters: set[str] = set()

        # The BlueZ APIs only maps children to parents, so we need to keep maps
        # to quickly find the children of a parent D-Bus object.

        # map of device d-bus object paths to set of service d-bus object paths
        self._service_map: dict[str, set[str]] = {}
        # map of service d-bus object paths to set of characteristic d-bus object paths
        self._characteristic_map: dict[str, set[str]] = {}
        # map of characteristic d-bus object paths to set of descriptor d-bus object paths
        self._descriptor_map: dict[str, set[str]] = {}

        self._advertisement_callbacks: defaultdict[str, list[AdvertisementCallback]] = (
            defaultdict(list)
        )
        self._device_removed_callbacks: list[DeviceRemovedCallbackAndState] = []
        self._device_watchers: dict[str, set[DeviceWatcher]] = {}
        self._condition_callbacks: dict[str, set[DeviceConditionCallback]] = {}
        self._services_cache: dict[str, BleakGATTServiceCollection] = {}

    def _check_adapter(self, adapter_path: str) -> None:
        """
        Raises:
            BleakError: if adapter is not present in BlueZ
        """
        if adapter_path not in self._properties:
            raise BleakError(f"adapter '{adapter_path.split('/')[-1]}' not found")

    def _check_device(self, device_path: str) -> None:
        """
        Raises:
            BleakError: if device is not present in BlueZ
        """
        if device_path not in self._properties:
            raise BleakError(f"device '{device_path.split('/')[-1]}' not found")

    def _get_device_property(
        self, device_path: str, interface: str, property_name: str
    ) -> Any:
        self._check_device(device_path)
        device_properties = self._properties[device_path]

        try:
            interface_properties = device_properties[interface]
        except KeyError:
            raise BleakError(
                f"Interface {interface} not found for device '{device_path}'"
            )

        try:
            value = interface_properties[property_name]
        except KeyError:
            raise BleakError(
                f"Property '{property_name}' not found for '{interface}' in '{device_path}'"
            )

        return value

    async def async_init(self) -> None:
        """
        Connects to the D-Bus message bus and begins monitoring signals.

        It is safe to call this method multiple times. If the bus is already
        connected, no action is performed.
        """
        async with self._bus_lock:
            if self._bus and self._bus.connected:
                return

            self._services_cache = {}

            # We need to create a new MessageBus each time as
            # dbus-next will destroy the underlying file descriptors
            # when the previous one is closed in its finalizer.
            bus = MessageBus(bus_type=BusType.SYSTEM, auth=get_dbus_authenticator())

            try:
                # We need to call bus.disconnect() even when bus.connect() fails in
                # order to release the file handles created in the constructor.
                try:
                    await bus.connect()
                except AuthError as e:
                    raise BleakBluetoothNotAvailableError(
                        e.args[0],
                        BleakBluetoothNotAvailableReason.DENIED_BY_SYSTEM,
                    ) from e

                # Add signal listeners

                bus.add_message_handler(self._parse_msg)
                reply: Optional[Message]

                rules = MatchRules(
                    interface=defs.OBJECT_MANAGER_INTERFACE,
                    member="InterfacesAdded",
                    arg0path="/org/bluez/",
                )
                reply = await add_match(bus, rules)
                assert_reply(reply)

                rules = MatchRules(
                    interface=defs.OBJECT_MANAGER_INTERFACE,
                    member="InterfacesRemoved",
                    arg0path="/org/bluez/",
                )
                reply = await add_match(bus, rules)
                assert_reply(reply)

                rules = MatchRules(
                    interface=defs.PROPERTIES_INTERFACE,
                    member="PropertiesChanged",
                    path_namespace="/org/bluez",
                )
                reply = await add_match(bus, rules)
                assert_reply(reply)

                # get existing objects after adding signal handlers to avoid
                # race condition

                reply = await bus.call(
                    Message(
                        destination=defs.BLUEZ_SERVICE,
                        path="/",
                        member="GetManagedObjects",
                        interface=defs.OBJECT_MANAGER_INTERFACE,
                    )
                )
                assert_reply(reply)

                # dictionaries are cleared in case AddInterfaces was received first
                # or there was a bus reset and we are reconnecting
                self._properties.clear()
                self._service_map.clear()
                self._characteristic_map.clear()
                self._descriptor_map.clear()

                for path, interfaces in reply.body[0].items():
                    props = unpack_variants(interfaces)
                    self._properties[path] = props

                    if defs.ADAPTER_INTERFACE in props:
                        self._adapters.add(path)

                    service_props = cast(
                        GattService1, props.get(defs.GATT_SERVICE_INTERFACE)
                    )

                    if service_props:
                        self._service_map.setdefault(
                            service_props["Device"], set()
                        ).add(path)

                    char_props = cast(
                        GattCharacteristic1,
                        props.get(defs.GATT_CHARACTERISTIC_INTERFACE),
                    )

                    if char_props:
                        self._characteristic_map.setdefault(
                            char_props["Service"], set()
                        ).add(path)

                    desc_props = cast(
                        GattDescriptor1, props.get(defs.GATT_DESCRIPTOR_INTERFACE)
                    )

                    if desc_props:
                        self._descriptor_map.setdefault(
                            desc_props["Characteristic"], set()
                        ).add(path)

                if logger.isEnabledFor(logging.DEBUG):
                    logger.debug("initial properties: %s", self._properties)

            except BaseException:
                # if setup failed, disconnect
                bus.disconnect()
                raise

            if self._bus:
                # Even if we are disconnected, still need to call this to
                # release file handles.
                self._bus.disconnect()

            # Everything is setup, so save the bus
            self._bus = bus

    def get_default_adapter(self) -> str:
        """
        Gets the D-Bus object path of of the first powered Bluetooth adapter.

        Returns:
            Name of the first found powered adapter on the system, i.e. "/org/bluez/hciX".

        Raises:
            BleakBluetoothNotAvailableError:
                if there are no Bluetooth Low Energy adapters or if none of the adapters are powered

        .. versionchanged:: 2.0
            Now raises :class:`BleakBluetoothNotAvailableError` instead of :class:`BleakError`.
        """
        if not any(self._adapters):
            raise BleakBluetoothNotAvailableError(
                "No Bluetooth adapters found.",
                BleakBluetoothNotAvailableReason.NO_BLUETOOTH,
            )

        ble_central_adapters = list(
            filter(
                lambda a: "central"
                in self._properties[a][defs.ADAPTER_INTERFACE]["Roles"],
                self._adapters,
            )
        )

        if not ble_central_adapters:
            raise BleakBluetoothNotAvailableError(
                "No Bluetooth adapters with BLE 'central' role found.",
                BleakBluetoothNotAvailableReason.NO_BLE_CENTRAL_ROLE,
            )

        for adapter_path in ble_central_adapters:
            if cast(
                defs.Adapter1, self._properties[adapter_path][defs.ADAPTER_INTERFACE]
            )["Powered"]:
                return adapter_path

        raise BleakBluetoothNotAvailableError(
            "No powered Bluetooth adapters found. Turn on Bluetooth and try again.",
            BleakBluetoothNotAvailableReason.POWERED_OFF,
        )

    async def active_scan(
        self,
        adapter_path: str,
        filters: dict[str, Variant],
        advertisement_callback: AdvertisementCallback,
        device_removed_callback: DeviceRemovedCallback,
    ) -> Callable[[], Coroutine[Any, Any, None]]:
        """
        Configures the advertisement data filters and starts scanning.

        Args:
            adapter_path: The D-Bus object path of the adapter to use for scanning.
            filters: A dictionary of filters to pass to ``SetDiscoveryFilter``.
            advertisement_callback:
                A callable that will be called when new advertisement data is received.
            device_removed_callback:
                A callable that will be called when a device is removed from BlueZ.

        Returns:
            An async function that is used to stop scanning and remove the filters.

        Raises:
            BleakError: if the adapter is not present in BlueZ
        """
        async with self._bus_lock:
            assert self._bus

            # If the adapter doesn't exist, then the message calls below would
            # fail with "method not found". This provides a more informative
            # error message.
            self._check_adapter(adapter_path)

            self._advertisement_callbacks[adapter_path].append(advertisement_callback)

            device_removed_callback_and_state = DeviceRemovedCallbackAndState(
                device_removed_callback, adapter_path
            )
            self._device_removed_callbacks.append(device_removed_callback_and_state)

            try:
                # Apply the filters
                reply = await self._bus.call(
                    Message(
                        destination=defs.BLUEZ_SERVICE,
                        path=adapter_path,
                        interface=defs.ADAPTER_INTERFACE,
                        member="SetDiscoveryFilter",
                        signature="a{sv}",
                        body=[filters],
                    )
                )
                assert_reply(reply)

                # Start scanning
                reply = await self._bus.call(
                    Message(
                        destination=defs.BLUEZ_SERVICE,
                        path=adapter_path,
                        interface=defs.ADAPTER_INTERFACE,
                        member="StartDiscovery",
                    )
                )
                assert_reply(reply)

                async def stop() -> None:
                    # need to remove callbacks first, otherwise we get TxPower
                    # and RSSI properties removed during stop which causes
                    # incorrect advertisement data callbacks
                    self._advertisement_callbacks[adapter_path].remove(
                        advertisement_callback
                    )
                    self._device_removed_callbacks.remove(
                        device_removed_callback_and_state
                    )

                    async with self._bus_lock:
                        assert self._bus

                        reply = await self._bus.call(
                            Message(
                                destination=defs.BLUEZ_SERVICE,
                                path=adapter_path,
                                interface=defs.ADAPTER_INTERFACE,
                                member="StopDiscovery",
                            )
                        )

                        try:
                            assert_reply(reply)
                        except BleakDBusError as ex:
                            if ex.dbus_error != "org.bluez.Error.NotReady":
                                raise
                        else:
                            # remove the filters
                            reply = await self._bus.call(
                                Message(
                                    destination=defs.BLUEZ_SERVICE,
                                    path=adapter_path,
                                    interface=defs.ADAPTER_INTERFACE,
                                    member="SetDiscoveryFilter",
                                    signature="a{sv}",
                                    body=[{}],
                                )
                            )
                            assert_reply(reply)

                return stop
            except BaseException:
                # if starting scanning failed, don't leak the callbacks
                self._advertisement_callbacks[adapter_path].remove(
                    advertisement_callback
                )
                self._device_removed_callbacks.remove(device_removed_callback_and_state)
                raise

    async def passive_scan(
        self,
        adapter_path: str,
        filters: list[OrPatternLike],
        advertisement_callback: AdvertisementCallback,
        device_removed_callback: DeviceRemovedCallback,
    ) -> Callable[[], Coroutine[Any, Any, None]]:
        """
        Configures the advertisement data filters and starts scanning.

        Args:
            adapter_path: The D-Bus object path of the adapter to use for scanning.
            filters: A list of "or patterns" to pass to ``org.bluez.AdvertisementMonitor1``.
            advertisement_callback:
                A callable that will be called when new advertisement data is received.
            device_removed_callback:
                A callable that will be called when a device is removed from BlueZ.

        Returns:
            An async function that is used to stop scanning and remove the filters.

        Raises:
            BleakError: if the adapter is not present in BlueZ
        """
        async with self._bus_lock:
            assert self._bus

            # If the adapter doesn't exist, then the message calls below would
            # fail with "method not found". This provides a more informative
            # error message.
            self._check_adapter(adapter_path)

            self._advertisement_callbacks[adapter_path].append(advertisement_callback)

            device_removed_callback_and_state = DeviceRemovedCallbackAndState(
                device_removed_callback, adapter_path
            )
            self._device_removed_callbacks.append(device_removed_callback_and_state)

            try:
                monitor = AdvertisementMonitor(filters)

                # this should be a unique path to allow multiple python interpreters
                # running bleak and multiple scanners within a single interpreter
                monitor_path = f"/org/bleak/{os.getpid()}/{id(monitor)}"

                reply = await self._bus.call(
                    Message(
                        destination=defs.BLUEZ_SERVICE,
                        path=adapter_path,
                        interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE,
                        member="RegisterMonitor",
                        signature="o",
                        body=[monitor_path],
                    )
                )

                if (
                    reply.message_type == MessageType.ERROR
                    and reply.error_name == "org.freedesktop.DBus.Error.UnknownMethod"
                ):
                    raise BleakError(
                        "passive scanning on Linux requires BlueZ >= 5.56 with --experimental enabled and Linux kernel >= 5.10"
                    )

                assert_reply(reply)

                # It is important to export after registering, otherwise BlueZ
                # won't use the monitor
                self._bus.export(monitor_path, monitor)

                async def stop() -> None:
                    # need to remove callbacks first, otherwise we get TxPower
                    # and RSSI properties removed during stop which causes
                    # incorrect advertisement data callbacks
                    self._advertisement_callbacks[adapter_path].remove(
                        advertisement_callback
                    )
                    self._device_removed_callbacks.remove(
                        device_removed_callback_and_state
                    )

                    async with self._bus_lock:
                        assert self._bus

                        self._bus.unexport(monitor_path, monitor)

                        reply = await self._bus.call(
                            Message(
                                destination=defs.BLUEZ_SERVICE,
                                path=adapter_path,
                                interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE,
                                member="UnregisterMonitor",
                                signature="o",
                                body=[monitor_path],
                            )
                        )
                        assert_reply(reply)

                return stop

            except BaseException:
                # if starting scanning failed, don't leak the callbacks
                self._advertisement_callbacks[adapter_path].remove(
                    advertisement_callback
                )
                self._device_removed_callbacks.remove(device_removed_callback_and_state)
                raise

    def add_device_watcher(
        self,
        device_path: str,
        on_connected_changed: DeviceConnectedChangedCallback,
        on_characteristic_value_changed: CharacteristicValueChangedCallback,
    ) -> DeviceWatcher:
        """
        Registers a device watcher to receive callbacks when device state
        changes or events are received.

        Args:
            device_path:
                The D-Bus object path of the device.
            on_connected_changed:
                A callback that is called when the device's "Connected"
                state changes.
            on_characteristic_value_changed:
                A callback that is called whenever a characteristic receives
                a notification/indication.

        Returns:
            A device watcher object that acts a token to unregister the watcher.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        self._check_device(device_path)

        watcher = DeviceWatcher(
            device_path, on_connected_changed, on_characteristic_value_changed
        )

        self._device_watchers.setdefault(device_path, set()).add(watcher)
        return watcher

    def remove_device_watcher(self, watcher: DeviceWatcher) -> None:
        """
        Unregisters a device watcher.

        Args:
            The device watcher token that was returned by
            :meth:`add_device_watcher`.
        """
        device_path = watcher.device_path
        self._device_watchers[device_path].remove(watcher)
        if not self._device_watchers[device_path]:
            del self._device_watchers[device_path]

    async def get_services(
        self, device_path: str, use_cached: bool, requested_services: Optional[set[str]]
    ) -> BleakGATTServiceCollection:
        """
        Builds a new :class:`BleakGATTServiceCollection` from the current state.

        Args:
            device_path:
                The D-Bus object path of the Bluetooth device.
            use_cached:
                When ``True`` if there is a cached :class:`BleakGATTServiceCollection`,
                the method will not wait for ``"ServicesResolved"`` to become true
                and instead return the cached service collection immediately.
            requested_services:
                When given, only return services with UUID that is in the list
                of requested services.

        Returns:
            A new :class:`BleakGATTServiceCollection`.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        self._check_device(device_path)

        if use_cached:
            services = self._services_cache.get(device_path)
            if services is not None:
                logger.debug("Using cached services for %s", device_path)
                return services

        await self._wait_for_services_discovery(device_path)

        services = BleakGATTServiceCollection()

        for service_path in self._service_map.get(device_path, set()):
            service_props = cast(
                GattService1,
                self._properties[service_path][defs.GATT_SERVICE_INTERFACE],
            )

            service = BleakGATTService(
                (service_path, service_props),
                extract_service_handle_from_path(service_path),
                service_props["UUID"],
            )

            if (
                requested_services is not None
                and service.uuid not in requested_services
            ):
                continue

            services.add_service(service)

            for char_path in self._characteristic_map.get(service_path, set()):
                char_props = cast(
                    GattCharacteristic1,
                    self._properties[char_path][defs.GATT_CHARACTERISTIC_INTERFACE],
                )

                char = BleakGATTCharacteristic(
                    (char_path, char_props),
                    extract_service_handle_from_path(char_path),
                    char_props["UUID"],
                    char_props["Flags"],
                    # Because `char_props` is a loop varialbe, we cannot
                    # directly bind a closure (i.e. lambda) to it;
                    # instead, we let `functools.partial` create a new
                    # function frame to close over at each iteration.
                    partial(get_max_write_without_response_size, char_props),
                    service,
                )

                services.add_characteristic(char)

                for desc_path in self._descriptor_map.get(char_path, set()):
                    desc_props = cast(
                        GattDescriptor1,
                        self._properties[desc_path][defs.GATT_DESCRIPTOR_INTERFACE],
                    )

                    desc = BleakGATTDescriptor(
                        (desc_path, desc_props),
                        int(desc_path[-4:], 16),
                        desc_props["UUID"],
                        char,
                    )

                    services.add_descriptor(desc)

        self._services_cache[device_path] = services

        return services

    def get_device_name(self, device_path: str) -> str:
        """
        Gets the value of the "Name" property for a device.

        Args:
            device_path: The D-Bus object path of the device.

        Returns:
            The current property value.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        return self._get_device_property(device_path, defs.DEVICE_INTERFACE, "Name")

    def get_device_address(self, device_path: str) -> str:
        """
        Gets the value of the "Address" property for a device.

        Args:
            device_path: The D-Bus object path of the device.

        Returns:
            The current property value.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        return self._get_device_property(device_path, defs.DEVICE_INTERFACE, "Address")

    def is_connected(self, device_path: str) -> bool:
        """
        Gets the value of the "Connected" property for a device.

        Args:
            device_path: The D-Bus object path of the device.

        Returns:
            The current property value or ``False`` if the device does not exist in BlueZ.
        """
        try:
            return self._properties[device_path][defs.DEVICE_INTERFACE]["Connected"]
        except KeyError:
            return False

    def is_paired(self, device_path: str) -> bool:
        """
        Gets the value of the "Paired" property for a device.

        Args:
            device_path: The D-Bus object path of the device.

        Returns:
            The current property value or ``False`` if the device does not exist in BlueZ.
        """
        try:
            return self._properties[device_path][defs.DEVICE_INTERFACE]["Paired"]
        except KeyError:
            return False

    async def _wait_for_services_discovery(self, device_path: str) -> None:
        """
        Waits for the device services to be discovered.

        If a disconnect happens before the completion a BleakError exception is raised.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        self._check_device(device_path)

        with contextlib.ExitStack() as stack:
            services_discovered_wait_task = asyncio.create_task(
                self._wait_condition(device_path, "ServicesResolved", True)
            )
            stack.callback(services_discovered_wait_task.cancel)

            device_disconnected_wait_task = asyncio.create_task(
                self._wait_condition(device_path, "Connected", False)
            )
            stack.callback(device_disconnected_wait_task.cancel)

            # in some cases, we can get "InterfaceRemoved" without the
            # "Connected" property changing, so we need to race against both
            # conditions
            device_removed_wait_task = asyncio.create_task(
                self._wait_removed(device_path)
            )
            stack.callback(device_removed_wait_task.cancel)

            done, _ = await asyncio.wait(
                {
                    services_discovered_wait_task,
                    device_disconnected_wait_task,
                    device_removed_wait_task,
                },
                return_when=asyncio.FIRST_COMPLETED,
            )

            # check for exceptions
            for task in done:
                task.result()

            if not done.isdisjoint(
                {device_disconnected_wait_task, device_removed_wait_task}
            ):
                raise BleakError("failed to discover services, device disconnected")

    async def _wait_removed(self, device_path: str) -> None:
        """
        Waits for the device interface to be removed.

        If the device is not present in BlueZ, this returns immediately.

        Args:
            device_path: The D-Bus object path of a Bluetooth device.
        """
        if device_path not in self._properties:
            return

        event = asyncio.Event()

        def callback(o: str) -> None:
            if o == device_path:
                event.set()

        device_removed_callback_and_state = DeviceRemovedCallbackAndState(
            callback, self._properties[device_path][defs.DEVICE_INTERFACE]["Adapter"]
        )

        with contextlib.ExitStack() as stack:
            self._device_removed_callbacks.append(device_removed_callback_and_state)
            stack.callback(
                self._device_removed_callbacks.remove, device_removed_callback_and_state
            )
            await event.wait()

    async def _wait_condition(
        self, device_path: str, property_name: str, property_value: Any
    ) -> None:
        """
        Waits for a condition to become true.

        Args:
            device_path: The D-Bus object path of a Bluetooth device.
            property_name: The name of the property to test.
            property_value: A value to compare the current property value to.

        Raises:
            BleakError: if the device is not present in BlueZ
        """
        value = self._get_device_property(
            device_path, defs.DEVICE_INTERFACE, property_name
        )

        if value == property_value:
            return

        event = asyncio.Event()

        def _wait_condition_callback(new_value: Optional[Any]) -> None:
            """Callback for when a property changes."""
            if new_value == property_value:
                event.set()

        condition_callbacks = self._condition_callbacks
        device_callbacks = condition_callbacks.setdefault(device_path, set())
        callback = DeviceConditionCallback(_wait_condition_callback, property_name)
        device_callbacks.add(callback)

        try:
            # can be canceled
            await event.wait()
        finally:
            device_callbacks.remove(callback)
            if not device_callbacks:
                del condition_callbacks[device_path]

    def _parse_msg(self, message: Message) -> None:
        """
        Handles callbacks from dbus_fast.
        """

        if message.message_type != MessageType.SIGNAL:
            return

        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(
                "received D-Bus signal: %s.%s (%s): %s",
                message.interface,
                message.member,
                message.path,
                message.body,
            )

        # type hints
        obj_path: str
        interfaces_and_props: dict[str, dict[str, Variant]]
        interfaces: list[str]
        interface: str
        changed: dict[str, Variant]
        invalidated: list[str]

        if message.member == "InterfacesAdded":
            obj_path, interfaces_and_props = message.body

            for interface, props in interfaces_and_props.items():
                unpacked_props = unpack_variants(props)
                self._properties.setdefault(obj_path, {})[interface] = unpacked_props

                if interface == defs.GATT_SERVICE_INTERFACE:
                    service_props = cast(GattService1, unpacked_props)
                    self._service_map.setdefault(service_props["Device"], set()).add(
                        obj_path
                    )
                elif interface == defs.GATT_CHARACTERISTIC_INTERFACE:
                    char_props = cast(GattCharacteristic1, unpacked_props)
                    self._characteristic_map.setdefault(
                        char_props["Service"], set()
                    ).add(obj_path)
                elif interface == defs.GATT_DESCRIPTOR_INTERFACE:
                    desc_props = cast(GattDescriptor1, unpacked_props)
                    self._descriptor_map.setdefault(
                        desc_props["Characteristic"], set()
                    ).add(obj_path)

                elif interface == defs.ADAPTER_INTERFACE:
                    self._adapters.add(obj_path)

                # If this is a device and it has advertising data properties,
                # then it should mean that this device just started advertising.
                # Previously, we just relied on RSSI updates to determine if
                # a device was actually advertising, but we were missing "slow"
                # devices that only advertise once and then go to sleep for a while.
                elif interface == defs.DEVICE_INTERFACE:
                    self._run_advertisement_callbacks(
                        obj_path, cast(Device1, unpacked_props)
                    )
        elif message.member == "InterfacesRemoved":
            obj_path, interfaces = message.body

            for interface in interfaces:
                try:
                    del self._properties[obj_path][interface]
                except KeyError:
                    pass

                if interface == defs.ADAPTER_INTERFACE:
                    try:
                        self._adapters.remove(obj_path)
                    except KeyError:
                        pass
                elif interface == defs.DEVICE_INTERFACE:
                    self._services_cache.pop(obj_path, None)
                    try:
                        del self._service_map[obj_path]
                    except KeyError:
                        pass

                    for callback, adapter_path in self._device_removed_callbacks:
                        if obj_path.startswith(adapter_path):
                            callback(obj_path)
                elif interface == defs.GATT_SERVICE_INTERFACE:
                    device_path = obj_path[: obj_path.rfind("/")]

                    try:
                        self._service_map[device_path].remove(obj_path)
                    except KeyError:
                        pass

                    try:
                        del self._characteristic_map[obj_path]
                    except KeyError:
                        pass
                elif interface == defs.GATT_CHARACTERISTIC_INTERFACE:
                    try:
                        del self._descriptor_map[obj_path]
                    except KeyError:
                        pass

            # Remove empty properties when all interfaces have been removed.
            # This avoids wasting memory for people who have noisy devices
            # with private addresses that change frequently.
            if obj_path in self._properties and not self._properties[obj_path]:
                del self._properties[obj_path]
        elif message.member == "PropertiesChanged":
            interface, changed, invalidated = message.body
            message_path = message.path
            assert message_path is not None

            try:
                self_interface = self._properties[message_path][interface]
            except KeyError:
                # This can happen during initialization. The "PropertiesChanged"
                # handler is attached before "GetManagedObjects" is called
                # and so self._properties may not yet be populated.
                # This is not a problem. We just discard the property value
                # since "GetManagedObjects" will return a newer value.
                pass
            else:
                # update self._properties first

                self_interface.update(unpack_variants(changed))

                for name in invalidated:
                    try:
                        del self_interface[name]
                    except KeyError:
                        # sometimes there BlueZ tries to remove properties
                        # that were never added
                        pass

                # then call any callbacks so they will be called with the
                # updated state

                if interface == defs.DEVICE_INTERFACE:
                    # handle advertisement watchers
                    device_path = message_path

                    self._run_advertisement_callbacks(
                        device_path, cast(Device1, self_interface)
                    )

                    # handle device condition watchers
                    callbacks = self._condition_callbacks.get(device_path)
                    if callbacks:
                        for item in callbacks:
                            name = item.property_name
                            if name in changed:
                                item.callback(self_interface.get(name))

                    # handle device connection change watchers
                    if "Connected" in changed:
                        new_connected = self_interface["Connected"]
                        watchers = self._device_watchers.get(device_path)
                        if watchers:
                            # callbacks may remove the watcher, hence the copy
                            for watcher in watchers.copy():
                                watcher.on_connected_changed(new_connected)

                elif interface == defs.GATT_CHARACTERISTIC_INTERFACE:
                    # handle characteristic value change watchers
                    if "Value" in changed:
                        new_value = self_interface["Value"]
                        device_path = device_path_from_characteristic_path(message_path)
                        watchers = self._device_watchers.get(device_path)
                        if watchers:
                            for watcher in watchers:
                                watcher.on_characteristic_value_changed(
                                    message_path, new_value
                                )

    def _run_advertisement_callbacks(self, device_path: str, device: Device1) -> None:
        """
        Runs any registered advertisement callbacks.

        Args:
            device_path: The D-Bus object path of the remote device.
            device: The current D-Bus properties of the device.
        """
        adapter_path = device["Adapter"]
        for callback in self._advertisement_callbacks[adapter_path]:
            callback(device_path, device.copy())


_global_instances: MutableMapping[Any, BlueZManager] = WeakKeyDictionary()


async def get_global_bluez_manager() -> BlueZManager:
    """
    Gets an existing initialized global BlueZ manager instance associated with the current event loop,
    or initializes a new instance.
    """

    loop = asyncio.get_running_loop()
    try:
        instance = _global_instances[loop]
    except KeyError:
        instance = _global_instances[loop] = BlueZManager()

    await instance.async_init()

    return instance
